package com.chatplus.application.web.notification.redis;

import com.chatplus.application.common.logging.SouthernQuietLogger;
import com.chatplus.application.common.logging.SouthernQuietLoggerFactory;
import com.chatplus.application.dao.framework.PendingNotificationDao;
import com.chatplus.application.web.notification.NotificationListener;
import com.chatplus.application.web.notification.RedisStreamListener;
import lombok.Data;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.springframework.stereotype.Component;
import org.springframework.util.ReflectionUtils;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.Predicate;

@Component
public class RedisStreamManager {
    private static final SouthernQuietLogger LOGGER = SouthernQuietLoggerFactory.getLogger(RedisStreamManager.class);

    private final List<ListenerEndpoint> listenerEndpoints = new ArrayList<>();
    private final ApplicationContext applicationContext;
    private final PendingNotificationDao pendingNotificationDao;
    private static final Predicate<Throwable> neverCancelSub = t -> false;

    public RedisStreamManager(ApplicationContext applicationContext,
                              PendingNotificationDao pendingNotificationDao) {
        this.applicationContext = applicationContext;
        this.pendingNotificationDao = pendingNotificationDao;
        initAllListeners();
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    public StreamMessageListenerContainer<String, MapRecord<String, String, String>> stringMapRecordStreamMessageListenerContainer(RedisConnectionFactory factory, StringRedisTemplate redisTemplate) {
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions =
                StreamMessageListenerContainer.StreamMessageListenerContainerOptions
                        .builder()
                        // 一次最多获取多少条，如果超过5条未处理，可能会堵塞
                        .batchSize(5)
                        // 设置了轮询超时的时间为1秒。这意味着当没有新的消息时，容器将每隔1秒进行一次轮询。
                        .pollTimeout(Duration.ofSeconds(1))
                        .build();
        StreamMessageListenerContainer<String, MapRecord<String, String, String>> container =
                StreamMessageListenerContainer.create(factory,
                        containerOptions);
        if (CollectionUtils.isEmpty(listenerEndpoints)) {
            return container;
        }
        StreamOperations<String, Object, Object> streamOperations = redisTemplate.opsForStream();
        String consumerGroup = "group-chat-plus";
        for (ListenerEndpoint listenerEndpoint : listenerEndpoints) {
            String streamKey = listenerEndpoint.getStreamKey();
            try {
                streamOperations.createGroup(streamKey, ReadOffset.from("0"), consumerGroup);
            } catch (Exception ignored) {
                // 流可能已存在，忽略异常
            }
        }
        for (ListenerEndpoint listenerEndpoint : listenerEndpoints) {
            String streamKey = listenerEndpoint.getStreamKey();
            container.register(StreamMessageListenerContainer.StreamReadRequest
                    .builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
                    .cancelOnError(neverCancelSub) // 发生异常时不cancel,避免redis流被关闭
                    .consumer(Consumer.from(consumerGroup, streamKey))
                    .autoAcknowledge(true) // 手动确认ack
                    .build(), listenerEndpoint.streamListener);

        }
        return container;
    }

    public void initAllListeners() {
        Arrays.stream(applicationContext.getBeanDefinitionNames())
                .map(name -> {
                    try {
                        return applicationContext.getBean(name);
                    } catch (BeansException e) {
                        LOGGER.message("查找NotificationListener时，bean未能初始化").context("name", name).info();
                        return null;
                    }
                })
                .filter(Objects::nonNull)
                .forEach(bean ->
                        Arrays.stream(ReflectionUtils.getAllDeclaredMethods(bean.getClass()))
                                .forEach(method -> AnnotatedElementUtils.getMergedRepeatableAnnotations(method, NotificationListener.class)
                                        .forEach(listener -> {
                                            String listenerName = listener.notification().getSimpleName();
                                            boolean success = initListener(listener, listenerName, bean, method);
                                            if (success) {
                                                LOGGER.message("找到NotificationListener")
                                                        .context(context -> {
                                                            context.put("notification", listener.notification().getSimpleName());
                                                            context.put("listenerName", listenerName);
                                                            context.put("bean", bean.getClass().getSimpleName());
                                                            context.put("method", method.getName());
                                                        }).info();
                                            }
                                        })
                                )
                );
    }

    protected boolean initListener(NotificationListener listener, String streamKey, Object bean, Method method) {
        //如果listenerEndpoints已经存在listenerRouting，则不继续创建了
        boolean isPresent = listenerEndpoints.stream()
                .anyMatch(endpoint -> streamKey.equals(endpoint.getStreamKey()));
        if (isPresent) {
            return false;
        }
        listenerEndpoints.stream()
                .filter(listenerEndpoint -> listener.notification() == listenerEndpoint.getListenerAnnotation().notification()
                        && streamKey.equals(listenerEndpoint.getStreamKey()))
                .findAny()
                .ifPresent(listenerEndpoint -> {
                    LOGGER.message("监听器重复")
                            .context(context -> {
                                context.put("listener", bean.getClass().getName());
                                context.put("listenerName", streamKey);
                                context.put("notification", listener.notification().getSimpleName());
                            })
                            .warn();
                    System.exit(-1);
                });
        ListenerEndpoint listenerEndpoint = new ListenerEndpoint();
        listenerEndpoint.setStreamKey(streamKey);
        listenerEndpoint.setStreamListener(new RedisStreamListener(listener, bean, method, pendingNotificationDao));
        listenerEndpoint.setListenerAnnotation(listener);
        listenerEndpoints.add(listenerEndpoint);
        return true;
    }

    @Data
    private static class ListenerEndpoint {
        private NotificationListener listenerAnnotation;
        private RedisStreamListener streamListener;
        private String streamKey;
    }
}
